{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Why use Azure machine learning(ML) pipelines to run your flows on the cloud?\n",
    "In real-world scenarios, flows serve various purposes. For example, consider a flow designed to evaluate the relevance score for a communication session between humans and agents. Suppose you want to trigger this flow every night to assess today’s performance and avoid peak hours for LLM (Language Model) endpoints. In this common scenario, people often encounter the following needs:\n",
    "- Handling Large Data Inputs: Running flows with thousands or millions of data inputs at once.\n",
    "- Scalability and Efficiency: Requiring a scalable, efficient, and resilient platform to ensure success.\n",
    "- Automations: Automatically triggering batch flows when upstream data is ready or at fixed intervals.\n",
    "\n",
    "__Azure ML pipelines__ address all these offline requirements effectively. With the integration of prompt flows and Azure ML pipeline, flow users could very easily achieve above goals and in this tutorial, you can learn:\n",
    "- How to use python SDK to automatically convert your flow into a 'step' in Azure ML pipeline.\n",
    "- How to feed your data into pipeline to trigger the batch flow runs.\n",
    "- How to build other pipeline steps ahead or behind your prompt flow step. e.g. data preprocessing or result aggregation.\n",
    "- How to setup a simple scheduler on my pipeline.\n",
    "- How to deploy pipeline to an Azure ML batch endpoint. Then I can invoke it with new data when needed.\n",
    "\n",
    "Before you begin, consider the following prerequisites:\n",
    "- Introduction to Azure ML Platform:\n",
    "    - [Core site of Azure ML platform](https://learn.microsoft.com/en-us/azure/machine-learning/overview-what-is-azure-machine-learning?view=azureml-api-2).\n",
    "    - Understand what [Azure ML pipelines](https://learn.microsoft.com/en-us/azure/machine-learning/concept-ml-pipelines?view=azureml-api-2) and [component](https://learn.microsoft.com/en-us/azure/machine-learning/concept-component?view=azureml-api-2) are.\n",
    "- Azure cloud setup:\n",
    "    - An Azure account with an active subscription - [Create an account for free](https://azure.microsoft.com/free/?WT.mc_id=A261C142F)\n",
    "    - Create an Azure ML resource from Azure portal - [Create a Azure ML workspace](https://ms.portal.azure.com/#view/Microsoft_Azure_Marketplace/MarketplaceOffersBlade/searchQuery/machine%20learning)\n",
    "    - Connect to your workspace then setup a basic computer cluster - [Configure workspace](../../configuration.ipynb)\n",
    "- Local environment setup:\n",
    "    - A python environment\n",
    "    - Installed Azure Machine Learning Python SDK v2 - [install instructions](../../../README.md) - check the getting started section and make sure version of 'azure-ai-ml' is higher than `1.12.0`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 1. Connect to Azure Machine Learning Workspace\n",
    "\n",
    "The [workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace) is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning. In this section we will connect to the workspace in which the job will be run.\n",
    "\n",
    "## 1.1 Import the required libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# import required libraries\n",
    "from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential\n",
    "from azure.ai.ml import MLClient, load_component, Input, Output\n",
    "from azure.ai.ml.constants import AssetTypes\n",
    "from azure.ai.ml.dsl import pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1.2 Configure credential\n",
    "\n",
    "We are using `DefaultAzureCredential` to get access to workspace. \n",
    "`DefaultAzureCredential` should be capable of handling most Azure SDK authentication scenarios. \n",
    "\n",
    "Reference for more available credentials if it does not work for you: [configure credential example](../../configuration.ipynb), [azure-identity reference doc](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity?view=azure-python)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    credential = DefaultAzureCredential()\n",
    "    # Check if given credential can get token successfully.\n",
    "    credential.get_token(\"https://management.azure.com/.default\")\n",
    "except Exception as ex:\n",
    "    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work\n",
    "    credential = InteractiveBrowserCredential()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1.3 Get a handle to the workspace\n",
    "\n",
    "We use 'config file' to connect to your workspace. Check [this notebook](../../configuration.ipynb) to get your config file from Azure ML workspace portal and paste it into this folder. Then if you pass the next code block, you've all set for the environment."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Get a handle to workspace\n",
    "ml_client = MLClient.from_config(credential=credential)\n",
    "\n",
    "# Retrieve an already attached Azure Machine Learning Compute.\n",
    "cluster_name = \"cpu-cluster\"\n",
    "print(ml_client.compute.get(cluster_name))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 2. Load flow as component\n",
    "If you’ve already authored a flow using the Promptflow SDK or portal, you can locate the flow.dag.yaml file within the flow folder. This YAML specification is essential for loading your flow into an Azure ML component.\n",
    "\n",
    "> __REMARK:__ To use `load_component` function with flow.dag.yaml, please ensure the following:</br>\n",
    "> - The `$schema` should be defined in target DAG yaml file. For example: `$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json`.</br>\n",
    "> - Flow metadata must be generated and kept up-to-date by verifying the file '<my-flow-directory>/.promptflow/flow.tools.json'. If it doesn't exist, run the following command to generate and update it: `pf flow validate --file <my-flow-directory>`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "flow_component = load_component(\"../../flows/standard/web-classification/flow.dag.yaml\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When using the `load_component` function and the flow YAML specification, your flow is automatically transformed into a __[parallel component](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-use-parallel-job-in-pipeline?view=azureml-api-2&tabs=cliv2#why-are-parallel-jobs-needed)__. This parallel component is designed for large-scale, offline, parallelized processing with efficiency and resilience. Here are some key features of this auto-converted component:\n",
    "\n",
    " - Pre-defined input and output ports:\n",
    "\n",
    "| port name | type | description |\n",
    "| --------- | ---- | ----------- |\n",
    "| data | uri_folder or uri_file | Accepts batch data input to your flow. You can use either the `uri_file` data type if your data is a single file or the `uri_folder` data type if your folder contains multiple files with the same schema. The default data type is jsonl, but you can customize this setting after declaring an instance of this flow component in your pipeline. Note that your data will be converted into a dataframe, so ensure that your CSV or TSV data includes a header line for proper mapping. |\n",
    "| flow_outputs | uri_file | Generates a single output file named parallel_run_step.jsonl. Each line in this data file corresponds to a JSON object representing the flow returns, along with an additional column called line_number indicating its position from the original file. |\n",
    "| debug_info | uri_folder | If you run your flow component in __debug mode__, this port provides debugging information for each run of your lines. E.g. intermediate outputs between steps, or LLM response and token usage. |\n",
    "\n",
    "![prompt flow base component image](../../../docs/media/cloud/flow-in-pipeline/pf-base-component.png)\n",
    "    \n",
    " - Auto-generated parameters \n",
    " \n",
    "   These parameters represent all your flow inputs and connections associated with your flow steps. You can set default values in the flow/run definition, and they can be further customized during job submission. Use '[web-classification](../../flows/standard/web-classification/flow.dag.yaml)' sample flow for example, this flow has only one input named 'url' and 2 LLM steps 'summarize_text_content' and 'classify_with_llm'. The input parameters of this flow component are:\n",
    " \n",
    "   ![prompt flow base component image](../../../docs/media/cloud/flow-in-pipeline/pf-component-parameters.png)\n",
    "\n",
    " - Auto-generated environment\n",
    "\n",
    "   The environment of the created component will be inherited by latest promptflow runtime image. User can include custom packages in the environment by specifying the `environment` attribute in `flow.dag.yaml`, along with a 'requirements.txt' file located under the same flow folder:\n",
    "   ```yaml\n",
    "      ...\n",
    "      environment:\n",
    "         python_requirements_txt: requirements.txt\n",
    "   ```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 3. Build your pipeline\n",
    "## 3.1 Declare input and output\n",
    "To supply your pipeline with data, you need to declare an input using the `path`, `type`, and `mode` properties. Please note: `mount` is the default and suggested mode for your file or folder data input.\n",
    "\n",
    "Declaring the pipeline output is optional. However, if you require a customized output path in the cloud, you can follow the example below to set the path on the datastore. For more detailed information on valid path values, refer to this documentation - [manage pipeline inputs outputs](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-manage-inputs-outputs-pipeline?view=azureml-api-2&tabs=cli#path-and-mode-for-data-inputsoutputs)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_input = Input(\n",
    "    path=\"../../flows/standard/web-classification/data.jsonl\",\n",
    "    type=AssetTypes.URI_FILE,\n",
    "    mode=\"mount\",\n",
    ")\n",
    "\n",
    "pipeline_output = Output(\n",
    "    # Provide custom flow output file path if needed\n",
    "    # path=\"azureml://datastores/<data_store_name>/paths/<path>\",\n",
    "    type=AssetTypes.URI_FOLDER,\n",
    "    # rw_mount is suggested for flow output\n",
    "    mode=\"rw_mount\",\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 3.2.1 Run pipeline with single flow component\n",
    "Since all Promptflow components are based on Azure ML parallel components, users can leverage specific __run settings__  to control the parallelization of flow runs. Below are some useful settings:\n",
    "\n",
    "| run settings | description | allowed values | default value |\n",
    "| ------------ | ----------- | -------------- | ------------- |\n",
    "| PF_INPUT_FORMAT | When utilizing `uri_folder` as the input data, this setting allows you to specify which file extensions should be treated as data files for initializing flow runs. | json, jsonl, csv, tsv | jsonl |\n",
    "| compute | Defines which compute cluster from your Azure ML workspace will be used for this job. | | |\n",
    "| instance_count | Define how many nodes from your compute cluster will be assigned to this job. | from 1 to node count of compute cluster. | 1 |\n",
    "| max_concurrency_per_instance | Defines how many dedicated processors will run the flow in parallel on 1 node. When combined with the 'instance_count' setting, the total parallelization of your flow will be instance_count*max_concurrency_per_instance.| >1 | 1 |\n",
    "| mini_batch_size | Define the number of lines for each mini-batches. A __mini-batch__ is the basic granularity for processing full data with parallelization. Each worker processor handles one mini-batch at a time, and all workers work in parallel across different nodes. | > 0 | 1 |\n",
    "| max_retries | Defines the retry count if any mini-batch encounters an inner exception. </br></br> Remark: The retry granularity is based on mini-batches. For instance, with the previous setting, you can set 100 lines per mini-batch. When one line execution encounters a transient issue or an unhandled exception, these 100 lines will be retried together, even if the remaining 99 lines are successful. Additionally, LLM responses with status code 429 will be handled internally for flow runs in most cases and will not trigger mini-batch failure. | >= 0 | 3 |\n",
    "| error_threshold | Defines how many failed lines are acceptable. If the count of failed lines exceeds this threshold, the job will be stopped and marked as failed. Set '-1' to disable this failure check. | -1 or >=0 | -1 |\n",
    "| mini_batch_error_threshold | Defines the maximum number of failed mini-batches that can be tolerated after all retries. Set '-1' to disable this failure check. | -1 or >=0 | -1 |\n",
    "| logging_level | Determines how parallel jobs save logs to disk. Setting to ‘DEBUG’ for the flow component allows the component to output intermediate flow logs into the ‘debug_info’ port. | INFO, WARNING, DEBUG | INFO |\n",
    "| timeout | Sets the timeout checker for each mini-batch execution in milliseconds. If a mini-batch runs longer than this threshold, it will be marked as failed and trigger the next retry. Consider setting a higher value based on your mini-batch size and total traffic throughput for your LLM endpoints. | > 0 | 600 |\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define the pipeline as a function\n",
    "@pipeline()\n",
    "def pipeline_func_with_flow(\n",
    "    # Function inputs will be treated as pipeline input data or parameters.\n",
    "    # Pipeline input could be linked to step inputs to pass data between steps.\n",
    "    # Users are not required to define pipeline inputs.\n",
    "    # With pipeline inputs, user can provide the different data or values when they trigger different pipeline runs.\n",
    "    pipeline_input_data: Input,\n",
    "    parallel_node_count: int = 1,\n",
    "):\n",
    "    # Declare pipeline step 'flow_node' by using flow component\n",
    "    flow_node = flow_component(\n",
    "        # Bind the pipeline intput data to the port 'data' of the flow component\n",
    "        # If you don't have pipeline input, you can directly pass the 'data_input' object to the 'data'\n",
    "        # But with this approach, you can't provide different data when you trigger different pipeline runs.\n",
    "        # data=data_input,\n",
    "        data=pipeline_input_data,\n",
    "        # Declare which column of input data should be mapped to flow input\n",
    "        # the value pattern follows ${data.<column_name_from_data_input>}\n",
    "        url=\"${data.url}\",\n",
    "        # Provide the connection values of the flow component\n",
    "        # The value of connection and deployment_name should align with your workspace connection settings.\n",
    "        connections={\n",
    "            \"summarize_text_content\": {\n",
    "                \"connection\": \"azure_open_ai_connection\",\n",
    "                \"deployment_name\": \"gpt-35-turbo\",\n",
    "            },\n",
    "            \"classify_with_llm\": {\n",
    "                \"connection\": \"azure_open_ai_connection\",\n",
    "                \"deployment_name\": \"gpt-35-turbo\",\n",
    "            },\n",
    "        },\n",
    "    )\n",
    "\n",
    "    # Provide run settings of your flow component\n",
    "    # Only 'compute' is required and other setting will keep default value if not provided.\n",
    "    flow_node.environment_variables = {\n",
    "        \"PF_INPUT_FORMAT\": \"jsonl\",\n",
    "    }\n",
    "    flow_node.compute = \"cpu-cluster\"\n",
    "    flow_node.resources = {\"instance_count\": parallel_node_count}\n",
    "    flow_node.mini_batch_size = 5\n",
    "    flow_node.max_concurrency_per_instance = 2\n",
    "    flow_node.retry_settings = {\n",
    "        \"max_retries\": 1,\n",
    "        \"timeout\": 1200,\n",
    "    }\n",
    "    flow_node.error_threshold = -1\n",
    "    flow_node.mini_batch_error_threshold = -1\n",
    "    flow_node.logging_level = \"DEBUG\"\n",
    "\n",
    "    # Function return will be treated as pipeline output. This is not required.\n",
    "    return {\"flow_result_folder\": flow_node.outputs.flow_outputs}\n",
    "\n",
    "\n",
    "# create pipeline instance\n",
    "pipeline_job_def = pipeline_func_with_flow(pipeline_input_data=data_input)\n",
    "pipeline_job_def.outputs.flow_result_folder = pipeline_output"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Submit the pipeline job to your workspace then check the status of your job on UI through the link in the output."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Submit the pipeline job to your workspace\n",
    "pipeline_job_run = ml_client.jobs.create_or_update(\n",
    "    pipeline_job_def, experiment_name=\"Single_flow_component_pipeline_job\"\n",
    ")\n",
    "pipeline_job_run\n",
    "\n",
    "ml_client.jobs.stream(pipeline_job_run.name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> __NOTE:__</br>\n",
    "> \n",
    "> - The choice of `mini_batch_size` significantly affects the efficiency of the flow job. Since the lines within each mini-batch run sequentially, setting a higher value for this parameter increases the chunk size, which reduces parallelization. On the other hand, larger batch sizes also raise the cost of retries, as retries are based on the entire mini-batch. Conversely, opting for the lowest value (e.g., mini_batch_size=1) may introduce additional overhead, affecting efficiency across multiple mini-batches during orchestration or result summarization. So it is recommended to start with a value between 10 and 100 and fine-tune it later based on your specific requirements.</br>\n",
    "> - The `max_concurrency_per_instance` setting can significantly enhance parallel efficiency within a single compute node. However, it also introduces several potential issues: 1) increase the risk of running out of memory, 2) LLM endpoint may experience throttling when too many requests arrive simultaneously. In general, it is advisable to set the max_concurrency_per_instance number equal to the core count of your compute to strike a balance between parallelism and resource constraints. </br>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 3.2.2 Run complex pipeline with multiple component\n",
    "In a typical pipeline, you’ll find multiple steps that encompass all your offline business requirements. If you’re aiming to construct a more intricate pipeline for production, explore the following resources:\n",
    " - [how to create component with SDK v2](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-create-component-pipeline-python?view=azureml-api-2)\n",
    " - Various component types:\n",
    "    - [Command](https://learn.microsoft.com/en-us/azure/machine-learning/reference-yaml-component-command?view=azureml-api-2)\n",
    "    - [Spark](https://learn.microsoft.com/en-us/azure/machine-learning/reference-yaml-component-spark?view=azureml-api-2)\n",
    "    - [Pipeline](https://learn.microsoft.com/en-us/azure/machine-learning/reference-yaml-component-pipeline?view=azureml-api-2)\n",
    "\n",
    "\n",
    "Additionally, consider the following sample code that loads two extra command components from a repository to construct a single offline pipeline:\n",
    " - __data_prep_component__ : This dummy data preprocessing step performs simple data sampling.\n",
    " - __result_parser_component__: Combining source data, flow results, and debugging output, it generates a single file containing origin queries, LLM predictions, and LLM token usages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# load Azure ML components\n",
    "data_prep_component = load_component(\"./components/data-prep/data-prep.yaml\")\n",
    "result_parser_component = load_component(\n",
    "    \"./components/result-parser/result-parser.yaml\"\n",
    ")\n",
    "\n",
    "# load flow as component\n",
    "flow_component = load_component(\"../../flows/standard/web-classification/flow.dag.yaml\")\n",
    "\n",
    "\n",
    "@pipeline()\n",
    "def pipeline_func_with_flow(pipeline_input_data):\n",
    "    data_prep_node = data_prep_component(\n",
    "        input_data_file=pipeline_input_data,\n",
    "    )\n",
    "    data_prep_node.compute = \"cpu-cluster\"\n",
    "\n",
    "    flow_node = flow_component(\n",
    "        # Feed the output of data_prep_node to the flow component\n",
    "        data=data_prep_node.outputs.output_data_folder,\n",
    "        url=\"${data.url}\",\n",
    "        connections={\n",
    "            \"summarize_text_content\": {\n",
    "                \"connection\": \"azure_open_ai_connection\",\n",
    "                \"deployment_name\": \"gpt-35-turbo\",\n",
    "            },\n",
    "            \"classify_with_llm\": {\n",
    "                \"connection\": \"azure_open_ai_connection\",\n",
    "                \"deployment_name\": \"gpt-35-turbo\",\n",
    "            },\n",
    "        },\n",
    "    )\n",
    "\n",
    "    flow_node.environment_variables = {\"PF_INPUT_FORMAT\": \"csv\"}\n",
    "    flow_node.compute = \"cpu-cluster\"\n",
    "    flow_node.mini_batch_size = 5\n",
    "    flow_node.max_concurrency_per_instance = 2\n",
    "    flow_node.resources = {\"instance_count\": 1}\n",
    "    flow_node.outputs.flow_outputs.mode = \"rw_mount\"\n",
    "    flow_node.logging_level = \"DEBUG\"\n",
    "\n",
    "    result_parser_node = result_parser_component(\n",
    "        source_data=data_prep_node.outputs.output_data_folder,\n",
    "        pf_output_data=flow_node.outputs.flow_outputs,\n",
    "        pf_debug_data=flow_node.outputs.debug_info,\n",
    "    )\n",
    "\n",
    "    flow_node.retry_settings = {\n",
    "        \"max_retries\": 1,\n",
    "        \"timeout\": 6000,\n",
    "    }\n",
    "\n",
    "    result_parser_node.compute = \"cpu-cluster\"\n",
    "\n",
    "    return {\"flow_result_folder\": result_parser_node.outputs.merged_data}\n",
    "\n",
    "\n",
    "# create pipeline instance\n",
    "pipeline_job_def = pipeline_func_with_flow(pipeline_input_data=data_input)\n",
    "pipeline_job_def.outputs.flow_result_folder = pipeline_output"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Submit the pipeline job to your workspace then check the status of your job on UI through the link in the output."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# submit job to workspace\n",
    "pipeline_job_run = ml_client.jobs.create_or_update(\n",
    "    pipeline_job_def, experiment_name=\"Complex_flow_component_pipeline_job\"\n",
    ")\n",
    "pipeline_job_run\n",
    "\n",
    "ml_client.jobs.stream(pipeline_job_run.name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 4.1 Next step - Setup scheduler for your pipeline\n",
    "\n",
    "Azure Machine Learning pipelines support native __scheduler__ to help users regularly run their pipeline jobs with predefined time triggers. Here’s a code example for setting up a scheduler on a newly created pipeline using the flow component.\n",
    "\n",
    "Let’s begin by declaring a scheduler with a customized recurrence pattern."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from datetime import datetime\n",
    "from azure.ai.ml.entities import JobSchedule, RecurrenceTrigger, RecurrencePattern\n",
    "from azure.ai.ml.constants import TimeZone\n",
    "\n",
    "schedule_name = \"simple_sdk_create_schedule_recurrence\"\n",
    "schedule_start_time = datetime.utcnow()\n",
    "\n",
    "recurrence_trigger = RecurrenceTrigger(\n",
    "    frequency=\"day\",  # could accept \"hour\", \"minute\", \"day\", \"week\", \"month\"\n",
    "    interval=1,\n",
    "    schedule=RecurrencePattern(hours=10, minutes=[0, 1]),\n",
    "    start_time=schedule_start_time,\n",
    "    time_zone=TimeZone.UTC,\n",
    ")\n",
    "\n",
    "job_schedule = JobSchedule(\n",
    "    name=schedule_name,\n",
    "    trigger=recurrence_trigger,\n",
    "    # Declare the pipeline job to be scheduled. Here we uses the pipeline job created in previous example.\n",
    "    create_job=pipeline_job_def,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To initiate the scheduler, follow this example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_schedule = ml_client.schedules.begin_create_or_update(\n",
    "    schedule=job_schedule\n",
    ").result()\n",
    "print(job_schedule)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To review all your scheduled jobs, navigate to the __Job List__ page within the Azure Machine Learning workspace UI. Any job triggered by the scheduler will have a display name following this format: `<schedule_name>-<trigger_time>`. For instance, if you have a schedule named “named-schedule,” a job triggered on January 1, 2021, at 06:00:00 UTC will have the display name “named-schedule-20210101T060000Z.”\n",
    "\n",
    "To disable or shut down a running scheduler, follow this example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_schedule = ml_client.schedules.begin_disable(name=schedule_name).result()\n",
    "job_schedule.is_enabled"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To explore further details about scheduling Azure Machine Learning pipeline jobs, visit this article on [how to schedule pipeline job](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-schedule-pipeline-job?view=azureml-api-2&tabs=python)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 4.2 Next step - Deploy pipeline to an endpoint\n",
    "Azure Machine Learning also offers __batch endpoints__, which enable you to deploy pipelines to an endpoint for efficient operationalization. If you require scheduling for your flow pipeline using an external orchestrator, such as Azure Data Factory or Microsoft Fabric, utilizing batch endpoints is the optimal recommendation for your flow pipeline.\n",
    "\n",
    "Let’s start by creating a new batch endpoint in your workspace."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from azure.ai.ml.entities import BatchEndpoint, PipelineComponentBatchDeployment\n",
    "\n",
    "# from azure.ai.ml.entities import ModelBatchDeployment, ModelBatchDeploymentSettings, Model, AmlCompute, Data, BatchRetrySettings, CodeConfiguration, Environment, Data\n",
    "# from azure.ai.ml.constants import BatchDeploymentOutputAction\n",
    "\n",
    "\n",
    "endpoint_name = \"hello-my-pipeline-endpoint\"\n",
    "endpoint = BatchEndpoint(\n",
    "    name=endpoint_name,\n",
    "    description=\"A hello world endpoint for pipeline\",\n",
    ")\n",
    "\n",
    "ml_client.batch_endpoints.begin_create_or_update(endpoint).result()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Each endpoint can support multiple deployments, each associated with distinct pipelines. In this context, we initiate a new deployment using our flow pipeline job, targeting the recently established endpoint."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "deployment = PipelineComponentBatchDeployment(\n",
    "    name=\"my-pipeline-deployment\",\n",
    "    description=\"A hello world deployment with a pipeline job.\",\n",
    "    endpoint_name=endpoint.name,\n",
    "    # Make sure 'pipeline_job_run' run successfully before deploying the endpoint\n",
    "    job_definition=pipeline_job_run,\n",
    "    settings={\"default_compute\": \"cpu-cluster\", \"continue_on_step_failure\": False},\n",
    ")\n",
    "\n",
    "ml_client.batch_deployments.begin_create_or_update(deployment).result()\n",
    "\n",
    "# Refresh the default deployment to the latest one at our endpoint.\n",
    "endpoint = ml_client.batch_endpoints.get(endpoint.name)\n",
    "endpoint.defaults.deployment_name = deployment.name\n",
    "ml_client.batch_endpoints.begin_create_or_update(endpoint).result()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Invoke the default deployment to target endpoint with proper data:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "batch_endpoint_job = ml_client.batch_endpoints.invoke(\n",
    "    endpoint_name=endpoint.name,\n",
    "    inputs={\"pipeline_input_data\": data_input},\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, verify the invocation on the workspace UI using the following link:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ml_client.jobs.get(batch_endpoint_job.name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To explore further details about Azure Machine Learning batch endpoint, visit this article on [how-to-use-batch-pipeline-deployments](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-use-batch-pipeline-deployments?view=azureml-api-2&tabs=python)"
   ]
  }
 ],
 "metadata": {
  "description": "Create pipeline using components to run a distributed job with tensorflow",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.13"
  },
  "resources": "examples/flows/standard/web-classification"
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
